home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
PC World Komputer 2010 April
/
PCWorld0410.iso
/
hity wydania
/
Ubuntu 9.10 PL
/
karmelkowy-koliberek-desktop-9.10-i386-PL.iso
/
casper
/
filesystem.squashfs
/
usr
/
lib
/
system-service
/
system-service-d
Wrap
Text File
|
2009-10-15
|
20KB
|
510 lines
#!/usr/bin/python
# (c) 2008 Canonical Ltd.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import sys
import gobject
import dbus
import dbus.service
import dbus.mainloop.glib
import os
import subprocess
import apt_pkg
import struct
import fcntl
from UbuntuSystemService.utils import *
class UnknownProxyTypeError(dbus.DBusException):
" a unknown proxy type was passed "
pass
class InvalidKeyboardTypeError(dbus.DBusException):
" a invalid keyboard was set "
pass
class PermissionDeniedError(dbus.DBusException):
" permission denied by policy "
pass
class ServiceBackend(dbus.service.Object):
"""
the main backend class that supports various system settings like
proxy and keyboard
"""
# some class properties
DBUS_INTERFACE_NAME = "com.ubuntu.SystemService"
SUPPORTED_PROXIES = ("http","ftp", "https", "socks")
# default files
CONSOLE_SETUP_DEFAULT = "/etc/default/console-setup"
DPKG_LOCK = "/var/lib/dpkg/lock"
def __init__(self):
bus_name = dbus.service.BusName(self.DBUS_INTERFACE_NAME,
bus=dbus.SystemBus())
dbus.service.Object.__init__(self, bus_name, '/')
apt_pkg.InitConfig()
def _authWithPolicyKit(self, sender, connection, priv):
#print "_authWithPolicyKit()"
system_bus = dbus.SystemBus()
obj = system_bus.get_object("org.freedesktop.PolicyKit1",
"/org/freedesktop/PolicyKit1/Authority",
"org.freedesktop.PolicyKit1.Authority")
policykit = dbus.Interface(obj, "org.freedesktop.PolicyKit1.Authority")
info = dbus.Interface(connection.get_object('org.freedesktop.DBus',
'/org/freedesktop/DBus/Bus',
False),
'org.freedesktop.DBus')
pid = info.GetConnectionUnixProcessID(sender)
#print "pid is:",pid
#print "priv: ", priv
subject = ('unix-process',
{ 'pid' : dbus.UInt32(pid, variant_level=1) }
)
details = { '' : '' }
flags = dbus.UInt32(1) # AllowUserInteraction = 0x00000001
cancel_id = ''
(ok, notused, details) = policykit.CheckAuthorization(subject,
priv,
details,
flags,
cancel_id)
#print "ok: ", ok
return ok
# proxy stuff ---------------------------------------------------
def _etc_environment_proxy(self, proxy_type):
" internal that returns the /etc/environment proxy "
if not os.path.exists("/etc/environment"):
return ""
for line in open("/etc/environment"):
if line.startswith("%s_proxy=" % proxy_type):
(key, value) = line.strip().split("=")
value = value.strip('"')
return value
return ""
def _http_proxy(self):
" internal helper that returns the current http proxy "
apt_proxy = self._apt_proxy("http")
env_proxy = self._etc_environment_proxy("http")
# FIXME: what to do if both proxies are differnet?
return env_proxy
def _apt_proxy(self, proxy_type):
" internal helper that returns the configured apt proxy"
apt_pkg.InitConfig()
proxy = apt_pkg.Config.Find("Acquire::%s::proxy" % proxy_type)
return proxy
def _ftp_proxy(self):
apt_proxy = self._apt_proxy("ftp")
env_proxy = self._etc_environment_proxy("ftp")
# FIXME: what to do if both proxies are differnet?
return env_proxy
def _socks_proxy(self):
env_proxy = self._etc_environment_proxy("socks")
return env_proxy
def _ftp_apt_proxy(self):
" internal helper that returns the configured apt proxy"
apt_pkg.InitConfig()
http_proxy = apt_pkg.Config.Find("Acquire::ftp::proxy")
return http_proxy
def _https_proxy(self):
" internal helper that returns the current https proxy "
env_proxy = self._etc_environment_proxy("https")
return env_proxy
def _verify_proxy(self, proxy_type, proxy):
" internal helper, verify that the proxy string is valid "
return verify_proxy(proxy_type, proxy)
def _verify_no_proxy(self, proxy):
" internal helper, verify that the no_proxy string is valid "
return verify_no_proxy(proxy)
@dbus.service.method(DBUS_INTERFACE_NAME,
in_signature='s',
out_signature='s',
sender_keyword='sender',
connection_keyword='conn')
def get_proxy(self, proxy_type, sender=None, conn=None):
"""
Get the current system-wide proxy for type "proxy_type"
This function will look in the apt configuration to
find the current http proxy.
"""
if proxy_type == "http":
return self._http_proxy()
if proxy_type == "https":
return self._https_proxy()
elif proxy_type == "ftp":
return self._ftp_proxy()
elif proxy_type == "socks":
return self._socks_proxy()
raise UnknownProxyTypeError, "proxy_type '%s' is unknown in get_proxy" % proxy_type
def _write_apt_proxy(self, proxy_type, new_proxy):
" helper that writes the new apt proxy "
confdir = apt_pkg.Config.FindDir("Dir::Etc")
if not self._verify_proxy(proxy_type, new_proxy):
return False
# check for the easy case (no proxy setting in the config)
old_proxy = self._apt_proxy(proxy_type)
if old_proxy == "":
f=open(os.path.join(confdir, "apt.conf"),"a")
f.write("Acquire::%s::proxy \"%s\";\n" % (proxy_type, new_proxy))
f.close()
return True
# now the difficult case (search the apt configuration files)
# build the list of apt configuration files first
apt_conffiles = [os.path.join(confdir,"apt.conf.d",n) for n in
os.listdir(os.path.join(confdir,"apt.conf.d"))]
apt_conffiles.insert(0, os.path.join(confdir,"apt.conf"))
# then scan them for the content
for f in apt_conffiles:
new_content = []
found = False
for line in open(f):
if line.lower().startswith("acquire::%s::proxy" % proxy_type):
found = True
line = "Acquire::%s::proxy \"%s\";\n" % (proxy_type, new_proxy)
# FIXME: scan for more complicated forms of the proxy
# settings and/or scan for the proxy string and just
# replace this
new_content.append(line)
# if we found/replaced the proxy, write it out now
if found:
open(f,"w").write("".join(new_content))
return True
return False
def _write_etc_environment_proxy(self, proxy_type, new_proxy):
if not self._verify_proxy(proxy_type, new_proxy):
return False
found=False
new_content=[]
new_proxy_line = '%s_proxy="%s"\n' % (proxy_type, new_proxy)
for line in open("/etc/environment"):
if line.startswith("%s_proxy=" % proxy_type):
line=new_proxy_line
found = True
new_content.append(line)
if found:
open("/etc/environment","w").write("".join(new_content))
else:
open("/etc/environment","a").write(new_proxy_line)
return True
def _clear_etc_environment_proxy(self, proxy_type):
found=False
new_content=[]
for line in open("/etc/environment"):
if line.startswith("%s_proxy=" % proxy_type):
found = True
else:
new_content.append(line)
if found:
open("/etc/environment","w").write("".join(new_content))
return True
def _clear_apt_proxy(self, proxy_type):
" helper that clears the apt proxy "
confdir = apt_pkg.Config.FindDir("Dir::Etc")
apt_conffiles = [os.path.join(confdir,"apt.conf.d",n) for n in
os.listdir(os.path.join(confdir,"apt.conf.d"))]
apt_conffiles.insert(0, os.path.join(confdir,"apt.conf"))
for f in apt_conffiles:
new_content = []
found = False
for line in open(f):
if line.lower().startswith("acquire::%s::proxy" % proxy_type):
found = True
else:
new_content.append(line)
# if we found/replaced the proxy, write it out now
if found:
open(f,"w").write("".join(new_content))
return True
@dbus.service.method(DBUS_INTERFACE_NAME,
in_signature='ss',
out_signature='b',
sender_keyword='sender',
connection_keyword='conn')
def set_proxy(self, proxy_type, new_proxy, sender=None, conn=None):
"""
Set a new system-wide proxy that looks like e.g.:
http://proxy.host.net:port/
This function will set a new apt configuration and
modify /etc/environment
"""
if not self._authWithPolicyKit(sender, conn,
"com.ubuntu.systemservice.setproxy"):
if not self._authWithPolicyKit(sender, conn,
"org.gnome.gconf.defaults.set-system"):
raise PermissionDeniedError, "Permission denied by policy"
# check if something supported is set
if not proxy_type in self.SUPPORTED_PROXIES:
raise UnknownProxyTypeError, "proxy_type '%s' is unknown in set_proxy" % proxy_type
# set (or reset)
if new_proxy == "" or new_proxy is None:
res = self._clear_apt_proxy(proxy_type)
res &= self._clear_etc_environment_proxy(proxy_type)
else:
res = self._write_apt_proxy(proxy_type, new_proxy)
res &= self._write_etc_environment_proxy(proxy_type, new_proxy)
return res
def _clear_etc_environment_no_proxy(self):
found=False
new_content=[]
for line in open("/etc/environment"):
if line.startswith("no_proxy="):
found = True
else:
new_content.append(line)
if found:
open("/etc/environment","w").write("".join(new_content))
return True
def _write_etc_environment_no_proxy(self, new_proxy):
if not self._verify_no_proxy(new_proxy):
return False
found=False
new_content=[]
new_proxy_line = 'no_proxy="%s"\n' % new_proxy
for line in open("/etc/environment"):
if line.startswith("no_proxy="):
line=new_proxy_line
found = True
new_content.append(line)
if found:
open("/etc/environment","w").write("".join(new_content))
else:
open("/etc/environment","a").write(new_proxy_line)
return True
@dbus.service.method(DBUS_INTERFACE_NAME,
in_signature='s',
out_signature='b',
sender_keyword='sender',
connection_keyword='conn')
def set_no_proxy(self, new_no_proxy, sender=None, conn=None):
"""
Set a new system-wide no_proxy list that looks like e.g.:
localhost,foo.com
This function will modify /etc/environment
"""
if not self._authWithPolicyKit(sender, conn,
"com.ubuntu.systemservice.setnoproxy"):
if not self._authWithPolicyKit(sender, conn,
"org.gnome.gconf.defaults.set-system"):
raise PermissionDeniedError, "Permission denied by policy"
# set (or reset)
if new_no_proxy == "" or new_no_proxy is None:
res = self._clear_no_proxy()
else:
res = self._write_etc_environment_no_proxy(new_no_proxy)
return res
# keyboard stuff ---------------------------------------------------
def _get_keyboard_from_etc(self):
"""
helper that reads /etc/default/console-setup and gets the
keyboard settings there
"""
model = ""
layout = ""
variant = ""
options = ""
for line in open(self.CONSOLE_SETUP_DEFAULT):
if line.startswith("XKBMODEL="):
model = line.split("=")[1].strip('"\n')
elif line.startswith("XKBLAYOUT="):
layout = line.split("=")[1].strip('"\n')
elif line.startswith("XKBVARIANT="):
variant = line.split("=")[1].strip('"\n')
elif line.startswith("XKBOPTIONS="):
options = line.split("=")[1].strip('"\n')
return (model, layout, variant, options)
@dbus.service.method(DBUS_INTERFACE_NAME,
in_signature='',
out_signature='ssss',
sender_keyword='sender',
connection_keyword='conn')
def get_keyboard(self, sender=None, conn=None):
"""
Set the system default keyboard configuration.
It expects four input arguments (strings):
model -- the model (evdev, pc105, ...)
layout -- the layout (de, us, ...)
variant -- the variant (nodeadkeys, ..)
options -- keyboard options (nocaps, ...)
It returns True on sucess
"""
(model, layout, variant, options) = self._get_keyboard_from_etc()
return (model, layout, variant, options)
def _set_keyboard_to_etc(self, model, layout, variant, options):
"""
helper that writes /etc/default/console-setup
"""
#print "set_keyboard_to_etc"
# FIXME: what to do if not os.path.exists(self.CONSOLE_SETUP_DEFAULT)
content = []
for line in open(self.CONSOLE_SETUP_DEFAULT):
if line.startswith("XKBMODEL="):
line = 'XKBMODEL="%s"\n' % model
elif line.startswith("XKBLAYOUT="):
line = 'XKBLAYOUT="%s"\n' % layout
elif line.startswith("XKBVARIANT="):
line = 'XKBVARIANT="%s"\n' % variant
elif line.startswith("XKBOPTIONS="):
line = 'XKBOPTIONS="%s"\n' % options
content.append(line)
# if something changed, write
if content != open(self.CONSOLE_SETUP_DEFAULT).readlines():
#print "content changed, writing"
open(self.CONSOLE_SETUP_DEFAULT+".new","w").write("".join(content))
os.rename(self.CONSOLE_SETUP_DEFAULT+".new",
self.CONSOLE_SETUP_DEFAULT)
return True
def _verify_keyboard_settings(self, model, layout, variant, options):
" helper that verfies the settings "
# check against char whitelist
allowed = "^[0-9a-zA-Z:,_]*$"
for s in (model, layout, variant, options):
if not re.match(allowed, s):
#print "illegal chars in '%s'" % s
return False
# check if 'ckbcomp' can compile it
cmd = ["ckbcomp"]
if model:
cmd += ["-model",model]
if layout:
cmd += ["-layout", layout]
if variant:
cmd += ["-variant", variant]
if options:
cmd += ["-option", options]
ret = subprocess.call(cmd, stdout=open(os.devnull))
return (ret == 0)
def _run_setupcon(self):
"""
helper that runs setupcon to activate the settings, taken from
oem-config (/usr/lib/oem-config/console/console-setup-apply)
"""
ret = subprocess.call(["setupcon","--save-only"])
subprocess.Popen(["/usr/sbin/update-initramfs","-u"])
return (ret == 0)
@dbus.service.method(DBUS_INTERFACE_NAME,
in_signature='ssss',
out_signature='b',
sender_keyword='sender',
connection_keyword='conn')
def set_keyboard(self, model, layout, variant, options, sender=None, conn=None):
"""
Get the current keyboard configuration. This returns four
strings: (model, layout, variant, options)
"""
#print "set_keyboard: ", model, layout, variant, options
if not self._authWithPolicyKit(sender, conn,
"com.ubuntu.systemservice.setkeyboard"):
if not self._authWithPolicyKit(sender, conn,
"org.gnome.gconf.defaults.set-system"):
raise PermissionDeniedError, "Permission denied by policy"
# if no keyboard model is set, try to guess one
# this is based on the "console-setup.config" code that
# defaults to pc105
if not model:
model = "pc105"
if layout == "us":
model = "pc104"
elif layout == "br":
model = "abnt2"
elif layout == "jp":
model = "jp106"
# verify the settings
if not self._verify_keyboard_settings(model, layout, variant, options):
#print "verify_keyboard failed"
raise InvalidKeyboardTypeError, "Invalid keyboard set"
# apply
if not self._set_keyboard_to_etc(model, layout, variant, options):
#print "could not write keyboard to /etc"
return False
if not self._run_setupcon():
#print "setupcon failed"
return False
return True
@dbus.service.method(DBUS_INTERFACE_NAME,
in_signature='',
out_signature='b',
sender_keyword='sender',
connection_keyword='conn')
def is_package_system_locked(self, sender=None, conn=None):
"""
Check if the package system is locked
"""
#print "set_keyboard: ", model, layout, variant, options
if not self._authWithPolicyKit(sender, conn,
"com.ubuntu.systemservice.ispkgsystemlocked"):
raise PermissionDeniedError, "Permission denied by policy"
# check for file
if not os.path.exists(self.DPKG_LOCK):
return True
# check for lock
flk=struct.pack('hhllhl',fcntl.F_WRLCK,0,0,0,0,0)
f=open(self.DPKG_LOCK)
rv = fcntl.fcntl(f, fcntl.F_GETLK, flk)
lockv = struct.unpack('hhllhl', rv)[0]
f.close()
return (lockv == fcntl.F_WRLCK)
if __name__ == "__main__":
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
server = ServiceBackend()
gobject.MainLoop().run()